源码分析Dubbo异步调用与事件回调机制 | 您所在的位置:网站首页 › dubbo同步调用 异步调用 › 源码分析Dubbo异步调用与事件回调机制 |
本文将详细分析Dubbo服务异步调用与事件回调机制。 1、异步调用与事件回调机制 1.1 异步回调 group=CONSUMER说明该过滤器属于消费端过滤器。 接下来从从invoke方法详细分析其实现逻辑。 public Result invoke(final Invoker invoker, final Invocation invocation) throws RpcException { final boolean isAsync = RpcUtils.isAsync(invoker.getUrl(), invocation); // @1 fireInvokeCallback(invoker, invocation); // @2 // need to configure if there's return value before the invocation in order to help invoker to judge if it's // necessary to return future. Result result = invoker.invoke(invocation); // @3 if (isAsync) { asyncCallback(invoker, invocation); // @4 } else { syncCallback(invoker, invocation, result); // @5 } return result; }代码@1:首先从URL中获取是否是异步调用标志,其配置属性为< dubbo:service async=""/>获取其子标签< dubbo:method async=""/>。 代码@2:同步调用oninvoke事件,执行invoke方法之前的事件。 代码@3:继续沿着调用链调用,最终会到具体的协议Invoker,例如DubboInvoker,发生具体的服务调用,跟踪一下同步、异步调用的实现细节。 代码@4:如果调用方式是异步模式,则异步调用onreturn或onthrow事件。 代码@5:如果调用方式是同步模式,则同步调用onreturn或onthrow事件。 2.1 源码分析FutureFilter#fireInvokeCallback private void fireInvokeCallback(final Invoker invoker, final Invocation invocation) { final Method onInvokeMethod = (Method) StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_METHOD_KEY)); // @1 final Object onInvokeInst = StaticContext.getSystemContext().get(StaticContext.getKey(invoker.getUrl(), invocation.getMethodName(), Constants.ON_INVOKE_INSTANCE_KEY)); // @2 if (onInvokeMethod == null && onInvokeInst == null) { // @3 return; } if (onInvokeMethod == null || onInvokeInst == null) { // @4 throw new IllegalStateException("service:" + invoker.getUrl().getServiceKey() + " has a onreturn callback config , but no such " + (onInvokeMethod == null ? "method" : "instance") + " found. url:" + invoker.getUrl()); } if (!onInvokeMethod.isAccessible()) { onInvokeMethod.setAccessible(true); } Object[] params = invocation.getArguments(); try { onInvokeMethod.invoke(onInvokeInst, params); // @5 } catch (InvocationTargetException e) { fireThrowCallback(invoker, invocation, e.getTargetException()); // @6 } catch (Throwable e) { fireThrowCallback(invoker, invocation, e); // @7 } } 代码@1:StaticContext.getSystemContext()中根据key:serviceKey + “.” + method + “.” + “oninvoke.method” 获取配置的oninvoke.method方法名。其中serviceKey为[group]/interface:[version],其中group与version可能为空,忽略。 代码@2:同样根据key:serviceKey + “.” + method + “.” + “oninvoke.instance” 从StaticContext.getSystemContext()获取oninvoke.method方法所在的实例名对象,也就是说该调用哪个对象的oninvoke.method指定的方法。这里就有一个疑问,这些数据是在什么时候存入StaticContext中的呢?下文会详细分析。 代码@3、@4:主要检测< dubbo:method oninvoke=""/>配置的正确性,其正确的配置方式如下:“实例名.方法名”,例如: 代码@1:首先获取async属性,如果为true表示异步请求,如果配置了return="false"表示调用模式为oneway,只发调用,不关注其调用结果。 代码@2:处理oneway的情况。如果设置了sent=true,表示等待网络数据发出才返回,如果sent=false,只是将待发送数据发到IO写缓存区就返回。 代码@3:处理异步的情况,代码@4处理同步调用的情况,细看其实都是通过调用网络客户端client的request,最终调用HeaderExchangeChannel#request方法: 2.3 源码分析asyncCallback与syncCallback 前面介绍了方法执行之前oninvoker事件的调用分析,接下来分析RPC服务调用完成后,onreturn和onthrow方法的调用逻辑。 异步回调与同步回调的区别就是调用onreturn(fireReturnCallback)和onthrow(fireThrowCallback)调用的地方不同,如果是同步调用,也就是在完成RPC服务调用后,立即调用相关的回调方法,如果是异步调用的话,RPC服务完成后,通过Future模式异步执行。其实关于onreturn、onthrow属性的解析,执行与oninvoker属性的解析完全一样,再这里也就不重复介绍了。 欢迎加笔者微信号(dingwpmz),加群探讨,笔者优质专栏目录: 1、源码分析RocketMQ专栏(40篇+) 2、源码分析Sentinel专栏(12篇+) 3、源码分析Dubbo专栏(28篇+) 4、源码分析Mybatis专栏 5、源码分析Netty专栏(18篇+) 6、源码分析JUC专栏 7、源码分析Elasticjob专栏 8、Elasticsearch专栏(20篇+) 9、源码分析MyCat专栏 |
CopyRight 2018-2019 实验室设备网 版权所有 |